package com.katesoft.scale4j.rttp.jobs;

import static com.katesoft.scale4j.rttp.client.ISpringContextAccessor.ACCESSOR;

import java.io.Serializable;

import org.perf4j.StopWatch;
import org.perf4j.log4j.Log4JStopWatch;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.launch.JobLauncher;

import com.katesoft.scale4j.log.LogFactory;
import com.katesoft.scale4j.log.Logger;

/**
 * This class is bridge between quartz scheduler and spring batch.
 * <p/>
 * Quartz will fire up distributed job execution and this class will copy job properties and will
 * fire spring batch job execution.
 * <p/>
 * 2 properties must be part of JobExecutionContext('rttp.jobLauncher', 'rttp.jobLocator') - if
 * client does not provide, those 2 properties will be injected by framework, and there is 1
 * optional property 'rttp.jobParametersIncrementer'(reference to JobParametersIncrementer bean).
 * <p/>
 * Please refer to http://static.springsource.org/spring-batch/reference/html-single/index.html#
 * JobParametersIncrementer for more details.
 * 
 * @author Dave Syer
 * @author kate2007
 */
public class SpringBatchJobLauncher implements Job, Serializable {
   private static final long serialVersionUID = -8464015735898762648L;
   private static final Logger LOGGER = LogFactory.getLogger(SpringBatchJobLauncher.class);
   //
   public static final String JOB_LAUNCHER = "rttp.jobLauncher";
   public static final String JOB_LOCATOR = "rttp.jobLocator";
   public static final String JOB_PARAMETERS_INCREMENTER = "rttp.jobParametersIncrementer";

   @SuppressWarnings({ "unchecked" })
   @Override
   public void execute(JobExecutionContext context) throws JobExecutionException {
      JobDataMap jobDataMap = context.getMergedJobDataMap();
      String jobName = (String) jobDataMap.get(JobsUtility.JOB_NAME);
      LOGGER.info("Quartz trigger fired for spring batch jobName=%s", jobName);
      JobParameters jobParameters = JobsUtility.covertJobParameters(jobDataMap, true);
      //
      JobLauncher jobLauncher = (JobLauncher) ACCESSOR.getBean(jobDataMap.getString(JOB_LAUNCHER));
      JobRegistry jobLocator = (JobRegistry) ACCESSOR.getBean(jobDataMap.getString(JOB_LOCATOR));
      JobParametersIncrementer jobParametersIncrementer = jobDataMap
               .containsKey(JOB_PARAMETERS_INCREMENTER) ? ACCESSOR.getBean(
               jobDataMap.getString(JOB_PARAMETERS_INCREMENTER), JobParametersIncrementer.class)
               : null;
      //
      if (jobParametersIncrementer == null) {
         jobParametersIncrementer = new QuartzSchedulerParametersIncrementer(context.getFireTime());
      }
      jobParameters = jobParametersIncrementer.getNext(jobParameters);
      StopWatch stopWatch = new Log4JStopWatch(
               String.format("rttp.spring.batch.runner_%s", jobName));
      try {
         LOGGER.info(
                  "launching spring batch job %s using[job_launcher=%s, job_locator=%s, jobParameters=%s]",
                  jobName, jobLauncher, jobLocator, jobParameters);
         jobLauncher.run(jobLocator.getJob(jobName), jobParameters);
      } catch (org.springframework.batch.core.JobExecutionException e) {
         LOGGER.warn(e);
         throw new JobExecutionException(e);
      } finally {
         stopWatch.setMessage(String.format("job %s finished execution", jobName));
         stopWatch.stop();
      }
   }
}
